/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.streaming.tests.queryablestate;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * A simple implementation of a queryable state client.
 * This client queries the state for a while (~2.5 mins) and prints
 * out the values that it found in the map state
 *
 * <p>Usage: java -jar QsStateClient.jar --host HOST --port PORT --job-id JOB_ID
 */
public class QsStateClient {

	private static final int BOOTSTRAP_RETRIES = 240;

	public static void main(final String[] args) throws Exception {

		ParameterTool parameters = ParameterTool.fromArgs(args);

		// setup values
		String jobId = parameters.getRequired("job-id");
		String host = parameters.get("host", "localhost");
		int port = parameters.getInt("port", 9069);
		int numIterations = parameters.getInt("iterations", 1500);

		QueryableStateClient client = new QueryableStateClient(host, port);
		client.setExecutionConfig(new ExecutionConfig());

		MapStateDescriptor<EmailId, EmailInformation> stateDescriptor =
				new MapStateDescriptor<>(
						QsConstants.STATE_NAME,
						TypeInformation.of(new TypeHint<EmailId>() {

						}),
						TypeInformation.of(new TypeHint<EmailInformation>() {

						})
				);

		// wait for state to exist
		for (int i = 0; i < BOOTSTRAP_RETRIES; i++) { // ~120s
			try {
				getMapState(jobId, client, stateDescriptor);
				break;
			} catch (ExecutionException e) {
				if (e.getCause() instanceof UnknownKeyOrNamespaceException) {
					System.err.println("State does not exist yet; sleeping 500ms");
					Thread.sleep(500L);
				} else {
					throw e;
				}
			}

			if (i == (BOOTSTRAP_RETRIES - 1)) {
				throw new RuntimeException("Timeout: state doesn't exist after 120s");
			}
		}

		// query state
		for (int iterations = 0; iterations < numIterations; iterations++) {

			MapState<EmailId, EmailInformation> mapState =
				getMapState(jobId, client, stateDescriptor);

			int counter = 0;
			for (Map.Entry<EmailId, EmailInformation> entry: mapState.entries()) {
				// this is to force deserialization
				entry.getKey();
				entry.getValue();
				counter++;
			}
			System.out.println("MapState has " + counter + " entries"); // we look for it in the test

			Thread.sleep(100L);
		}
	}

	private static MapState<EmailId, EmailInformation> getMapState(
			String jobId,
			QueryableStateClient client,
			MapStateDescriptor<EmailId, EmailInformation> stateDescriptor) throws InterruptedException, ExecutionException {

		CompletableFuture<MapState<EmailId, EmailInformation>> resultFuture =
				client.getKvState(
						JobID.fromHexString(jobId),
						QsConstants.QUERY_NAME,
						QsConstants.KEY, // which key of the keyed state to access
						BasicTypeInfo.STRING_TYPE_INFO,
						stateDescriptor);

		return resultFuture.get();
	}
}
